"""WebSocket server for the BigchainDB Event Stream API."""

# NOTE
#
# This module contains some functions and utilities that might belong to other
# modules. For now, I prefer to keep everything in this module. Why? Because
# those functions are needed only here.
#
# When we will extend this part of the project and we find that we need those
# functionalities elsewhere, we can start creating new modules and organizing
# things in a better way.


import json
import asyncio
import logging
import threading
from uuid import uuid4

import aiohttp
from aiohttp import web

from bigchaindb import config
from bigchaindb.events import EventTypes


logger = logging.getLogger(__name__)
POISON_PILL = 'POISON_PILL'
EVENTS_ENDPOINT = '/api/v1/streams/valid_tx'


def _multiprocessing_to_asyncio(in_queue, out_queue, loop):
    """Bridge between a synchronous multiprocessing queue
    and an asynchronous asyncio queue.

    Args:
        in_queue (multiprocessing.Queue): input queue
        out_queue (asyncio.Queue): output queue
    """

    while True:
        value = in_queue.get()
        loop.call_soon_threadsafe(out_queue.put_nowait, value)


class Dispatcher:
    """Dispatch events to websockets.

    This class implements a simple publish/subscribe pattern.
    """

    def __init__(self, event_source):
        """Create a new instance.

        Args:
            event_source: a source of events. Elements in the queue
            should be strings.
        """

        self.event_source = event_source
        self.subscribers = {}

    def subscribe(self, uuid, websocket):
        """Add a websocket to the list of subscribers.

        Args:
            uuid (str): a unique identifier for the websocket.
            websocket: the websocket to publish information.
        """

        self.subscribers[uuid] = websocket

    @asyncio.coroutine
    def publish(self):
        """Publish new events to the subscribers."""

        while True:
            event = yield from self.event_source.get()
            str_buffer = []

            if event == POISON_PILL:
                return

            if isinstance(event, str):
                str_buffer.append(event)

            elif event.type == EventTypes.BLOCK_VALID:
                block = event.data

                for tx in block['block']['transactions']:
                    asset_id = tx['id'] if tx['operation'] == 'CREATE' else tx['asset']['id']
                    data = {'block_id': block['id'],
                            'asset_id': asset_id,
                            'tx_id': tx['id']}
                    str_buffer.append(json.dumps(data))

            for _, websocket in self.subscribers.items():
                for str_item in str_buffer:
                    websocket.send_str(str_item)


@asyncio.coroutine
def websocket_handler(request):
    """Handle a new socket connection."""

    logger.debug('New websocket connection.')
    websocket = web.WebSocketResponse()
    yield from websocket.prepare(request)
    uuid = uuid4()
    request.app['dispatcher'].subscribe(uuid, websocket)

    while True:
        # Consume input buffer
        msg = yield from websocket.receive()
        if msg.type == aiohttp.WSMsgType.ERROR:
            logger.debug('Websocket exception: %s', websocket.exception())
            return


def init_app(event_source, *, loop=None):
    """Init the application server.

    Return:
        An aiohttp application.
    """

    dispatcher = Dispatcher(event_source)

    # Schedule the dispatcher
    loop.create_task(dispatcher.publish())

    app = web.Application(loop=loop)
    app['dispatcher'] = dispatcher
    app.router.add_get(EVENTS_ENDPOINT, websocket_handler)
    return app


def start(sync_event_source, loop=None):
    """Create and start the WebSocket server."""

    if not loop:
        loop = asyncio.get_event_loop()

    event_source = asyncio.Queue(loop=loop)

    bridge = threading.Thread(target=_multiprocessing_to_asyncio,
                              args=(sync_event_source, event_source, loop),
                              daemon=True)
    bridge.start()

    app = init_app(event_source, loop=loop)
    aiohttp.web.run_app(app,
                        host=config['wsserver']['host'],
                        port=config['wsserver']['port'])
